package org.codehaus.activemq.store.vm;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.jms.JMSException;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.SubscriberEntry;
import org.codehaus.activemq.service.Subscription;
import org.codehaus.activemq.store.TopicMessageStore;

public class VMTopicMessageStore extends VMMessageStore
  implements TopicMessageStore
{
  private static final Integer ONE = new Integer(1);
  private Map ackDatabase;
  private Map messageCounts;
  private Map subscriberDatabase;

  public VMTopicMessageStore()
  {
    this(new LinkedHashMap(), makeMap(), makeMap(), makeMap());
  }

  public VMTopicMessageStore(LinkedHashMap messageTable, Map subscriberDatabase, Map ackDatabase, Map messageCounts) {
    super(messageTable);
    this.subscriberDatabase = subscriberDatabase;
    this.ackDatabase = ackDatabase;
    this.messageCounts = messageCounts;
  }

  public void setMessageContainer(MessageContainer container) {
  }

  public synchronized void incrementMessageCount(MessageIdentity messageId) throws JMSException {
    Integer number = (Integer)this.messageCounts.get(messageId);
    if (number == null) {
      number = ONE;
    }
    else {
      number = new Integer(number.intValue() + 1);
    }
    this.messageCounts.put(messageId, number);
  }

  public synchronized void decrementMessageCountAndMaybeDelete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
    Integer number = (Integer)this.messageCounts.get(messageIdentity);
    if ((number == null) || (number.intValue() <= 1)) {
      removeMessage(messageIdentity, ack);
      if (number != null)
        this.messageCounts.remove(messageIdentity);
    }
    else
    {
      this.messageCounts.put(messageIdentity, new Integer(number.intValue() - 1));
      number = ONE;
    }
  }

  public void setLastAcknowledgedMessageIdentity(Subscription subscription, MessageIdentity messageIdentity) throws JMSException {
    String consumerId = subscription.getConsumerId();
    this.ackDatabase.put(consumerId, messageIdentity);
  }

  public void recoverSubscription(Subscription subscription, MessageIdentity lastDispatchedMessage) {
  }

  public MessageIdentity getLastestMessageIdentity() throws JMSException {
    return null;
  }

  public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException {
    Object key = info.getConsumerKey();
    return (SubscriberEntry)this.subscriberDatabase.get(key);
  }

  public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException {
    this.subscriberDatabase.put(info.getConsumerKey(), subscriberEntry);
  }

  public void stop() throws JMSException {
    this.ackDatabase.clear();
    this.messageCounts.clear();
    super.stop();
  }

  protected static Map makeMap() {
    return Collections.synchronizedMap(new HashMap());
  }
}